{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder.appName('ops').getOrCreate()\n",
    "df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Basic Operations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),\n",
       " Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Date: timestamp (nullable = true)\n",
      " |-- Open: double (nullable = true)\n",
      " |-- High: double (nullable = true)\n",
      " |-- Low: double (nullable = true)\n",
      " |-- Close: double (nullable = true)\n",
      " |-- Volume: integer (nullable = true)\n",
      " |-- Adj Close: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+----------+----------+------------------+----------+---------+------------------+\n",
      "|               Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|\n",
      "+-------------------+----------+----------+------------------+----------+---------+------------------+\n",
      "|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|\n",
      "|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|\n",
      "|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|\n",
      "+-------------------+----------+----------+------------------+----------+---------+------------------+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+------------------+\n",
      "|              High|               Low|\n",
      "+------------------+------------------+\n",
      "|        214.499996|212.38000099999996|\n",
      "|        215.589994|        213.249994|\n",
      "|            215.23|        210.750004|\n",
      "|        212.000006|        209.050005|\n",
      "|        212.000006|209.06000500000002|\n",
      "|        213.000002|        208.450005|\n",
      "|209.76999500000002|        206.419998|\n",
      "|210.92999500000002|        204.099998|\n",
      "|210.45999700000002|        209.020004|\n",
      "|211.59999700000003|        205.869999|\n",
      "|215.18999900000003|        207.240004|\n",
      "|        215.549994|        209.500002|\n",
      "|213.30999599999998|        207.210003|\n",
      "|        207.499996|            197.16|\n",
      "|        204.699999|        200.190002|\n",
      "|        213.710005|        202.580004|\n",
      "|            210.58|        199.530001|\n",
      "|        205.500004|        198.699995|\n",
      "|        202.199995|        190.250002|\n",
      "|             196.0|191.29999899999999|\n",
      "+------------------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter('close < 500').select(['High', 'Low']).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+------------------+\n",
      "|              High|               Low|\n",
      "+------------------+------------------+\n",
      "|        214.499996|212.38000099999996|\n",
      "|        215.589994|        213.249994|\n",
      "|            215.23|        210.750004|\n",
      "|        212.000006|        209.050005|\n",
      "|        212.000006|209.06000500000002|\n",
      "|        213.000002|        208.450005|\n",
      "|209.76999500000002|        206.419998|\n",
      "|210.92999500000002|        204.099998|\n",
      "|210.45999700000002|        209.020004|\n",
      "|211.59999700000003|        205.869999|\n",
      "|215.18999900000003|        207.240004|\n",
      "|        215.549994|        209.500002|\n",
      "|213.30999599999998|        207.210003|\n",
      "|        207.499996|            197.16|\n",
      "|        204.699999|        200.190002|\n",
      "|        213.710005|        202.580004|\n",
      "|            210.58|        199.530001|\n",
      "|        205.500004|        198.699995|\n",
      "|        202.199995|        190.250002|\n",
      "|             196.0|191.29999899999999|\n",
      "+------------------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(df['close'] < 500).select(['High', 'Low']).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|\n",
      "+-------------------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|\n",
      "|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|\n",
      "|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|\n",
      "|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|\n",
      "|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|\n",
      "|2010-01-11 00:00:00|212.79999700000002|        213.000002|        208.450005|210.11000299999998|115557400|         27.221758|\n",
      "|2010-01-12 00:00:00|209.18999499999998|209.76999500000002|        206.419998|        207.720001|148614900|          26.91211|\n",
      "|2010-01-13 00:00:00|        207.870005|210.92999500000002|        204.099998|        210.650002|151473000|          27.29172|\n",
      "|2010-01-14 00:00:00|210.11000299999998|210.45999700000002|        209.020004|            209.43|108223500|         27.133657|\n",
      "|2010-01-15 00:00:00|210.92999500000002|211.59999700000003|        205.869999|            205.93|148516900|26.680197999999997|\n",
      "|2010-01-19 00:00:00|        208.330002|215.18999900000003|        207.240004|        215.039995|182501900|27.860484999999997|\n",
      "|2010-01-20 00:00:00|        214.910006|        215.549994|        209.500002|            211.73|153038200|         27.431644|\n",
      "|2010-01-21 00:00:00|        212.079994|213.30999599999998|        207.210003|        208.069996|152038600|         26.957455|\n",
      "|2010-01-25 00:00:00|202.51000200000001|        204.699999|        200.190002|        203.070002|266424900|26.309658000000002|\n",
      "|2010-01-26 00:00:00|205.95000100000001|        213.710005|        202.580004|        205.940001|466777500|         26.681494|\n",
      "|2010-01-27 00:00:00|        206.849995|            210.58|        199.530001|        207.880005|430642100|26.932840000000002|\n",
      "|2010-02-16 00:00:00|        201.940002|        203.690002|        201.520006|        203.399996|135934400|         26.352412|\n",
      "|2010-02-17 00:00:00|        204.190001|        204.310003|        200.860004|        202.550003|109099200|         26.242287|\n",
      "|2010-02-18 00:00:00|        201.629995|        203.889994|        200.920006|        202.929998|105706300|         26.291519|\n",
      "|2010-02-19 00:00:00|        201.860001|        203.200005|        201.109997|        201.669996|103867400|26.128273999999998|\n",
      "+-------------------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter((df['Open'] > 200) & ~(df['Close'] < 200)).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+------------------+----------+------+------+---------+---------+\n",
      "|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|\n",
      "+-------------------+------------------+----------+------+------+---------+---------+\n",
      "|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|\n",
      "+-------------------+------------------+----------+------+------+---------+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(df['Low'] == 197.16).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.filter(df['Low'] == 197.16).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "result = df.filter(df['Low'] == 197.16).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "220441900"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "row = result[0]\n",
    "row.asDict()['Volume']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   GOOG|Charlie|120.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "+-------+-------+-----+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)\n",
    "df.show(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Company: string (nullable = true)\n",
      " |-- Person: string (nullable = true)\n",
      " |-- Sales: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Groupby and Aggregate"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.sql.group.GroupedData at 0x7f5a0612ad30>"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.groupBy('Company')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------------+\n",
      "|company|       avg(Sales)|\n",
      "+-------+-----------------+\n",
      "|   APPL|            370.0|\n",
      "|   GOOG|            220.0|\n",
      "|     FB|            610.0|\n",
      "|   MSFT|322.3333333333333|\n",
      "+-------+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy('Company').mean().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|Company|sum(Sales)|\n",
      "+-------+----------+\n",
      "|   APPL|    1480.0|\n",
      "|   GOOG|     660.0|\n",
      "|     FB|    1220.0|\n",
      "|   MSFT|     967.0|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy('Company').sum().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|Company|min(Sales)|\n",
      "+-------+----------+\n",
      "|   APPL|     130.0|\n",
      "|   GOOG|     120.0|\n",
      "|     FB|     350.0|\n",
      "|   MSFT|     124.0|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy('Company').min().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----+\n",
      "|Company|count|\n",
      "+-------+-----+\n",
      "|   APPL|    4|\n",
      "|   GOOG|    3|\n",
      "|     FB|    2|\n",
      "|   MSFT|    3|\n",
      "+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy('Company').count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+\n",
      "|min(Sales)|\n",
      "+----------+\n",
      "|     120.0|\n",
      "+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.agg({'Sales': 'min'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+\n",
      "|sum(Sales)|\n",
      "+----------+\n",
      "|    4327.0|\n",
      "+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.agg({'Sales': 'sum'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [],
   "source": [
    "group_data = df.groupBy('Company')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|Company|sum(Sales)|\n",
      "+-------+----------+\n",
      "|   APPL|    1480.0|\n",
      "|   GOOG|     660.0|\n",
      "|     FB|    1220.0|\n",
      "|   MSFT|     967.0|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "group_data.agg({'Sales': 'sum'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------------------+\n",
      "|count(DISTINCT Sales)|\n",
      "+---------------------+\n",
      "|                   11|\n",
      "+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import countDistinct, avg, stddev\n",
    "df.select(countDistinct('Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+\n",
      "|    Average Sales|\n",
      "+-----------------+\n",
      "|360.5833333333333|\n",
      "+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(avg('Sales').alias('Average Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------+\n",
      "|count(DISTINCT Company)|\n",
      "+-----------------------+\n",
      "|                      4|\n",
      "+-----------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(countDistinct('Company')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|stddev_samp(Sales)|\n",
      "+------------------+\n",
      "|250.08742410799007|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(stddev('Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|               std|\n",
      "+------------------+\n",
      "|250.08742410799007|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(stddev('Sales').alias('std')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import format_number\n",
    "sales_std = df.select(stddev('Sales').alias('std'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 59,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|               std|\n",
      "+------------------+\n",
      "|250.08742410799007|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sales_std.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+\n",
      "|   std|\n",
      "+------+\n",
      "|250.09|\n",
      "+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sales_std.select(format_number('std', 2).alias('std')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   GOOG|Charlie|120.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "|   MSFT|   Tina|600.0|\n",
      "|   MSFT|    Amy|124.0|\n",
      "|   MSFT|Vanessa|243.0|\n",
      "|     FB|   Carl|870.0|\n",
      "|     FB|  Sarah|350.0|\n",
      "|   APPL|   John|250.0|\n",
      "|   APPL|  Linda|130.0|\n",
      "|   APPL|   Mike|750.0|\n",
      "|   APPL|  Chris|350.0|\n",
      "+-------+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|   GOOG|Charlie|120.0|\n",
      "|   MSFT|    Amy|124.0|\n",
      "|   APPL|  Linda|130.0|\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   MSFT|Vanessa|243.0|\n",
      "|   APPL|   John|250.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "|     FB|  Sarah|350.0|\n",
      "|   APPL|  Chris|350.0|\n",
      "|   MSFT|   Tina|600.0|\n",
      "|   APPL|   Mike|750.0|\n",
      "|     FB|   Carl|870.0|\n",
      "+-------+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.orderBy('Sales').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 70,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|     FB|   Carl|870.0|\n",
      "|   APPL|   Mike|750.0|\n",
      "|   MSFT|   Tina|600.0|\n",
      "|     FB|  Sarah|350.0|\n",
      "|   APPL|  Chris|350.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "|   APPL|   John|250.0|\n",
      "|   MSFT|Vanessa|243.0|\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   APPL|  Linda|130.0|\n",
      "|   MSFT|    Amy|124.0|\n",
      "|   GOOG|Charlie|120.0|\n",
      "+-------+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.orderBy(df['Sales'].desc()).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Missing Value"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 71,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John| null|\n",
      "|emp2| null| null|\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = spark.read.csv('ContainsNull.csv', inferSchema = True, header=True)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 73,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.drop().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 77,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John| null|\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.drop(thresh=2).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John| null|\n",
      "|emp2| null| null|\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.drop(how = 'all').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.drop(subset=\"Sales\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 80,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John| null|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.drop(subset='Name').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 81,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Id: string (nullable = true)\n",
      " |-- Name: string (nullable = true)\n",
      " |-- Sales: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 83,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John|  0.0|\n",
      "|emp2| null|  0.0|\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.fill(0).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+----------+-----+\n",
      "|  Id|      Name|Sales|\n",
      "+----+----------+-----+\n",
      "|emp1|      John| null|\n",
      "|emp2|fill value| null|\n",
      "|emp3|fill value|345.0|\n",
      "|emp4|     Cindy|456.0|\n",
      "+----+----------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.fill('fill value').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 85,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+-----+\n",
      "|  Id|   Name|Sales|\n",
      "+----+-------+-----+\n",
      "|emp1|   John| null|\n",
      "|emp2|no name| null|\n",
      "|emp3|no name|345.0|\n",
      "|emp4|  Cindy|456.0|\n",
      "+----+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.fill('no name', subset = 'Name').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 89,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(avg(Sales)=400.5)]"
      ]
     },
     "execution_count": 89,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.sql.functions import mean\n",
    "df.select(mean('Sales')).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 90,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(avg(Sales)=400.5)]"
      ]
     },
     "execution_count": 90,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.select(mean(df['Sales'])).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 91,
   "metadata": {},
   "outputs": [],
   "source": [
    "mean_val = df.select(mean(df['Sales'])).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 98,
   "metadata": {},
   "outputs": [],
   "source": [
    "mean_sales = mean_val[0][0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 101,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John|400.5|\n",
      "|emp2| null|400.5|\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.fill(mean_sales, subset='Sales').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 103,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+-----+\n",
      "|  Id| Name|Sales|\n",
      "+----+-----+-----+\n",
      "|emp1| John|400.5|\n",
      "|emp2| null|400.5|\n",
      "|emp3| null|345.0|\n",
      "|emp4|Cindy|456.0|\n",
      "+----+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.na.fill(df.select(mean(df['Sales'])).collect()[0][0]).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dates and Timestamps"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 104,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.csv('appl_stock.csv', inferSchema = True, header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 107,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]"
      ]
     },
     "execution_count": 107,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 108,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+------------------+\n",
      "|               Date|              Open|\n",
      "+-------------------+------------------+\n",
      "|2010-01-04 00:00:00|        213.429998|\n",
      "|2010-01-05 00:00:00|        214.599998|\n",
      "|2010-01-06 00:00:00|        214.379993|\n",
      "|2010-01-07 00:00:00|            211.75|\n",
      "|2010-01-08 00:00:00|        210.299994|\n",
      "|2010-01-11 00:00:00|212.79999700000002|\n",
      "|2010-01-12 00:00:00|209.18999499999998|\n",
      "|2010-01-13 00:00:00|        207.870005|\n",
      "|2010-01-14 00:00:00|210.11000299999998|\n",
      "|2010-01-15 00:00:00|210.92999500000002|\n",
      "|2010-01-19 00:00:00|        208.330002|\n",
      "|2010-01-20 00:00:00|        214.910006|\n",
      "|2010-01-21 00:00:00|        212.079994|\n",
      "|2010-01-22 00:00:00|206.78000600000001|\n",
      "|2010-01-25 00:00:00|202.51000200000001|\n",
      "|2010-01-26 00:00:00|205.95000100000001|\n",
      "|2010-01-27 00:00:00|        206.849995|\n",
      "|2010-01-28 00:00:00|        204.930004|\n",
      "|2010-01-29 00:00:00|        201.079996|\n",
      "|2010-02-01 00:00:00|192.36999699999998|\n",
      "+-------------------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select('Date', 'Open').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 109,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month,\n",
    "                                  year, weekofyear, format_number, date_format)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 111,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------------+\n",
      "|dayofmonth(Date)|\n",
      "+----------------+\n",
      "|               4|\n",
      "|               5|\n",
      "|               6|\n",
      "|               7|\n",
      "|               8|\n",
      "+----------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(dayofmonth(df['Date'])).show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 115,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+\n",
      "|hour(Date)|\n",
      "+----------+\n",
      "|         0|\n",
      "|         0|\n",
      "|         0|\n",
      "|         0|\n",
      "|         0|\n",
      "+----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(hour(df['Date'])).show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 117,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------+\n",
      "|month(Date)|\n",
      "+-----------+\n",
      "|          1|\n",
      "|          1|\n",
      "|          1|\n",
      "|          1|\n",
      "|          1|\n",
      "+-----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(month(df['Date'])).show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 118,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+\n",
      "|year(Date)|\n",
      "+----------+\n",
      "|      2010|\n",
      "|      2010|\n",
      "|      2010|\n",
      "|      2010|\n",
      "|      2010|\n",
      "+----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(year(df['Date'])).show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 124,
   "metadata": {},
   "outputs": [],
   "source": [
    "new_df = df.withColumn('Year', year(df['Date']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 129,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------------------+\n",
      "|Year|        avg(Close)|\n",
      "+----+------------------+\n",
      "|2015|120.03999980555547|\n",
      "|2013| 472.6348802857143|\n",
      "|2014| 295.4023416507935|\n",
      "|2012| 576.0497195640002|\n",
      "|2016|104.60400786904763|\n",
      "|2010| 259.8424600000002|\n",
      "|2011|364.00432532142867|\n",
      "+----+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "result = new_df.groupBy('Year').mean().select(['Year', 'avg(Close)'])\n",
    "result.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 132,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+---------------------+\n",
      "|Year|Average Closing Price|\n",
      "+----+---------------------+\n",
      "|2015|   120.03999980555547|\n",
      "|2013|    472.6348802857143|\n",
      "|2014|    295.4023416507935|\n",
      "|2012|    576.0497195640002|\n",
      "|2016|   104.60400786904763|\n",
      "|2010|    259.8424600000002|\n",
      "|2011|   364.00432532142867|\n",
      "+----+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "new_result = result.withColumnRenamed('avg(Close)', 'Average Closing Price')\n",
    "new_result.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 134,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+---------------------+\n",
      "|Year|Average Closing Price|\n",
      "+----+---------------------+\n",
      "|2015|               120.04|\n",
      "|2013|               472.63|\n",
      "|2014|               295.40|\n",
      "|2012|               576.05|\n",
      "|2016|               104.60|\n",
      "|2010|               259.84|\n",
      "|2011|               364.00|\n",
      "+----+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "new_result.select(['Year', format_number('Average Closing Price', 2).alias('Average Closing Price')]).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
